package com.atguigu.flink.sql;

import com.atguigu.flink.function.WaterSensorMapFunction;
import com.atguigu.flink.pojo.WaterSensor;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * Created by Smexy on 2023/2/5
 */
public class Demo5_WriteKafka
{
    public static void main(String[] args) {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env);

        SingleOutputStreamOperator<WaterSensor> ds = env
            .socketTextStream("hadoop103", 8888)
            .map(new WaterSensorMapFunction());
        Table table = tableEnvironment.fromDataStream(ds);

        //为表起名
        tableEnvironment.createTemporaryView("t2",table);

        String createTableSql = "create table t1( id string,ts bigint ,vc int ) with(" +
            "                    'connector' = 'kafka' ," +
            "                    'topic' = 'topicA' , " +
            "                    'properties.bootstrap.servers' = 'hadoop102:9092' , " +
            "                    'sink.partitioner' = 'round-robin' , " +
            "                    'value.format' = 'json'  " +
            "                    )";

        //执行建表(写操作)
        tableEnvironment.executeSql(createTableSql);

        //执行查询
        tableEnvironment.executeSql("insert into t1 select * from t2");

    }
}
